package com.atguigu.day09;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Example6 {
    public static void main(String[] args) {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();
        var tableEnv = StreamTableEnvironment
                .create(
                        env,
                        EnvironmentSettings.newInstance().inStreamingMode().build()
                );

        // 创建一张输入动态表，来将文件流中的数据，写入输入动态表
        tableEnv
                .executeSql(
                        "CREATE TABLE clicks (`username` STRING, `url` STRING)" +
                                "  WITH (" +
                                "  'connector' = 'filesystem'," +
                                "  'path' = '/home/yuantuzhi/flinktutorial0926/src/main/resources/file.csv'," +
                                "  'format' = 'csv'" +
                                "  )"
                );

        // 创建一张输出动态表，用来输出结果
        tableEnv
                .executeSql(
                        "CREATE TABLE result_table (`username` STRING, `count` BIGINT)" +
                                "  WITH('connector' = 'print')"
                );

        // 在输入表中查询，得到的结果写入输出表
        tableEnv
                .executeSql(
                        "INSERT INTO result_table" +
                                "  SELECT `username`, COUNT(`username`) AS `count` FROM clicks GROUP BY `username`"
                );
    }
}
